/**
 * Copyright (c) 2014-2015, GoBelieve     
 * All rights reserved.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

package main

import "net"
import "time"
import "unsafe"
import "sync"
import "sync/atomic"
import log "github.com/sirupsen/logrus"
import "github.com/gorilla/websocket"
import "container/list"

const CLIENT_TIMEOUT = (60 * 6)

//待发送的消息数量限制
const MESSAGE_QUEUE_LIMIT = 300

//socket阻塞状态下消息的数量限制,此时socket可能已经被对端异常关闭
const MESSAGE_QUEUE_BLOCK_LIMIT = 30

type Conn interface {
	Close() error
	
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
	
	ReadMessage()(*Message, error)
	WriteMessage(msg *Message) error
}


type NetConn struct {
	net.Conn
}


func (conn *NetConn) ReadMessage()(*Message, error) {
	return ReceiveClientMessage(conn.Conn)
}

func (conn *NetConn) WriteMessage(msg *Message) error {
	return SendMessage(conn.Conn, msg)	
}

type WSConn struct {
	*websocket.Conn
}


func (ws *WSConn) ReadMessage()(*Message, error) {
	return ReadWebsocketMessage(ws.Conn)
}

func (ws *WSConn) WriteMessage(msg *Message) error {
	return SendWebsocketBinaryMessage(ws.Conn, msg)	
}



type Connection struct {
	conn   Conn
	closed int32
	
	forbidden int32 //是否被禁言
	notification_on bool //桌面在线时是否通知手机端
	online bool

	sync_count int64 //点对点消息同步计数，用于判断是否是首次同步
	tc     int32 //write channel timeout count
	blocking int32 //write blocking
	
	wt     chan *Message
	lwt    chan int
	//离线消息
	pwt    chan []*Message

	sequence int //send message seq
	//客户端协议版本号
	version int

	tm     time.Time
	appid  int64
	uid    int64
	device_id string
	device_ID int64 //generated by device_id + platform_id
	platform_id int8
	
	messages *list.List //待发送的消息队列 FIFO
	mutex  sync.Mutex
}


func (client *Connection) Client() *Client {
	p := unsafe.Pointer(client)
	return (*Client)(p)
}

//自己是否是发送者
func (client *Connection) isSender(msg *Message, device_id int64) bool {
	if msg.cmd == MSG_IM || msg.cmd == MSG_GROUP_IM {
		m := msg.body.(*IMMessage)
		if m.sender == client.uid && device_id == client.device_ID {
			return true
		}
	}

	if msg.cmd == MSG_CUSTOMER_V2 {
		m := msg.body.(*CustomerMessageV2)
		if m.sender_appid == client.appid &&
			m.sender == client.uid &&
			device_id == client.device_ID {
			return true
		}
	}
	return false
}

//发送超级群消息
func (client *Connection) SendGroupMessage(group *Group, msg *Message) {
	appid := client.appid

	PublishGroupMessage(appid, group.gid, msg)

	DispatchMessageToGroup(msg, group, appid, client.Client())
}


func (client *Connection) SendMessage(uid int64, msg *Message) bool {
	appid := client.appid

	PublishMessage(appid, uid, msg)

	DispatchMessageToPeer(msg, uid, appid, client.Client())
	return true
}

func (client *Connection) EnqueueNonBlockContinueMessage(msg *Message, sub_msg *Message) bool {
	closed := atomic.LoadInt32(&client.closed)
	if closed > 0 {
		log.Infof("can't send message to closed connection:%d", client.uid)
		return false
	}

	tc := atomic.LoadInt32(&client.tc)
	if tc > 0 {
		log.Infof("can't send message to blocked connection:%d", client.uid)
		atomic.AddInt32(&client.tc, 1)
		return false
	}

	blocking := atomic.LoadInt32(&client.blocking)
	queue_limit := MESSAGE_QUEUE_LIMIT
	if blocking != 0 {
		queue_limit = MESSAGE_QUEUE_BLOCK_LIMIT
	}
	
	dropped := false
	client.mutex.Lock()
	if client.messages.Len() >= queue_limit {
		//队列阻塞，丢弃之前的消息
		client.messages.Remove(client.messages.Front())
		dropped = true
	}
	client.messages.PushBack(msg)

	if sub_msg != nil {
		if client.messages.Len() >= queue_limit {
			//队列阻塞，丢弃之前的消息
			client.messages.Remove(client.messages.Front())
			dropped = true
		}
		client.messages.PushBack(sub_msg)
	}
	client.mutex.Unlock()
	if dropped {
		log.Info("message queue full, drop a message")
	}

	if client.messages.Len() > 50 {
		log.Warning("message queue jam, connection:%d", client.uid)
	}
	//nonblock
	select {
	case client.lwt <- 1:
	default:
	}
	
	return true
}


func (client *Connection) EnqueueNonBlockMessage(msg *Message) bool {
	return client.EnqueueNonBlockContinueMessage(msg, nil)
}


func (client *Connection) EnqueueMessage(msg *Message) bool {
	closed := atomic.LoadInt32(&client.closed)
	if closed > 0 {
		log.Infof("can't send message to closed connection:%d", client.uid)
		return false
	}

	tc := atomic.LoadInt32(&client.tc)
	if tc > 0 {
		log.Infof("can't send message to blocked connection:%d", client.uid)
		atomic.AddInt32(&client.tc, 1)
		return false
	}
	select {
	case client.wt <- msg:
		return true
	case <- time.After(60*time.Second):
		atomic.AddInt32(&client.tc, 1)
		log.Infof("send message to wt timed out:%d", client.uid)
		return false
	}
}


func (client *Connection) EnqueueMessages(msgs []*Message) bool {
	closed := atomic.LoadInt32(&client.closed)
	if closed > 0 {
		log.Infof("can't send messages to closed connection:%d", client.uid)
		return false
	}

	tc := atomic.LoadInt32(&client.tc)
	if tc > 0 {
		log.Infof("can't send messages to blocked connection:%d", client.uid)
		atomic.AddInt32(&client.tc, 1)
		return false
	}
	select {
	case client.pwt <- msgs:
		return true
	case <- time.After(60*time.Second):
		atomic.AddInt32(&client.tc, 1)
		log.Infof("send messages to pwt timed out:%d", client.uid)
		return false
	}
}

func (client *Connection) read() *Message {
	client.conn.SetReadDeadline(time.Now().Add(CLIENT_TIMEOUT * time.Second))
	msg, _ := client.conn.ReadMessage()
	return msg
}

func (client *Connection) send(m *Message) {
	client.sequence += 1
	msg := m
	if msg.version != client.version {
		msg = &Message{
			cmd:m.cmd,
			seq:m.seq,
			version:client.version,
			flag:m.flag,
			body:m.body,
		}
	}
	msg.seq = client.sequence

	complete_c := make(chan int, 1)
	block := func() {
		//running in other goroutine, must do very little work
		atomic.StoreInt32(&client.blocking, 1)
		complete_c <- 1
	}

	timer := time.AfterFunc(10*time.Millisecond, block)

	tc := atomic.LoadInt32(&client.tc)
	if tc > 0 {
		log.Info("can't write data to blocked socket")
		return
	}	
	client.conn.SetWriteDeadline(time.Now().Add(30 * time.Second))	
	err := client.conn.WriteMessage(msg)
	if err != nil {
		atomic.AddInt32(&client.tc, 1)
		log.Info("send msg:", Command(msg.cmd),  " tcp err:", err)
	}

	r := timer.Stop()
	if !r {
		log.Info("send message blocked")
		//waiting function block completed
		<- complete_c
		atomic.StoreInt32(&client.blocking, 0)
	}
}


func (client *Connection) close() {
	client.conn.Close()
}
